一、分布式事务概述
由微服务中的多个系统通过网络交互协作完成的操作构成事务是分布式事务
1.1 分布式事务应用架构
单一服务分布式事务 atomikos
单服务-多库
解决方案:(全局事务管理器TM)
分库分表
数据量大时,分库分表
解决方案:中间件+数据库同步技术
分库分表中间件:mycat \shardingdb
多服务多数据源分布式事务
多数据库--多服务:
解决方案:全局TM(数据库) 和 服务协调者(服务),共同协调\数据库资源\服务
解决方案的分类
刚性事务:ACID(本地事务)
柔性事务:(CAP\BASE)(分布式事务管理)
分布式理论
1.2 CAP理论
分布式系统在设计时只能在一致性(Consistency)、可用性(Availability)、分区容忍性(Partition Tolerance)中满足两种,无法兼顾三种。
一致性(Consistency):多个服务存储同一份数据, 而且数据需要保持同一时刻数据一致性。
可用性(Availability):多个服务结点,其中一个结点宕机不影响整个集群对外提供服务,如:只有服务A结点时,当服务A宕机整个系统将无法提供服务,增加服务B、C是为了保证系统的可用性。
分区容忍性(Partition Tolerance):分区容忍性就是允许系统通过网络协同工作,分区容忍性要解决由于网络分区导致数据的不完整及无法访问等问题。 分布式系统不可避免的出现了多个系统通过网络协同工作的场景,结点之间难免会出现网络中断、网延延迟等现象,这种现象一旦出现就导致数据被分散在不同的结点上,这就是网络分区。
分布式事务中向满足三者是不可能的,有以下几种组合方式
1、CA:放弃分区容忍性,加强一致性和可用性,关系数据库按照CA进行设计。
2、AP:放弃一致性,加强可用性和分区容忍性,追求最终一致性,很多NoSQL数据库按照AP进行设计。
说明:这里放弃一致性是指放弃强一致性,强一致性就是写入成功立刻要查询出最新数据。追求最终一致性是指允许暂时的数据不一致,只要最终在用户接受的时间内数据 一致即可。
3、CP:放弃可用性,加强一致性和分区容忍性,一些强一致性要求的系统按CP进行设计,比如跨行转账,一次转账请求要等待双方银行系统都完成整个事务才算完成。
说明:由于网络问题的存在CP系统可能会出现待等待超时,如果没有处理超时问题则整理系统会出现阻塞。 总结:
在分布式系统设计中AP的应用较多,即保证分区容忍性和可用性,牺牲数据的强一致性(写操作后立刻读取到最新数据),保证数据最终一致性。比如:订单退款,今日退款成功,明日账户到账,只要在预定的用户可以接受的时间内退款事务走完即可。
1.3 BASE理论
核心思想:分布式事务,只需达到最终一致性(Eventual Consitency)即可。
Basically Available(基本可用)分布式系统在出现故障时,允许损失部分可用功能,保证核心功能可用。
Soft state(软状态):比如订单状态:(待付款\已付款\发货\已签收\已结束) 由于不要求强一致性,所以BASE允许系统中存在中间状态(也叫软状态),这个状态不影响系统可用 性,如订单的"支付中"、“数据同步中”等状态,待数据最终一致后状态改为“成功”状态。
Eventually consistent(最终一致性)最终一致是指经过一段时间后,所有节点数据都将会达到一致。如订单的"支付中"状态,最终会变 为“支付成功”或者"支付失败",使订单状态与实际交易结果达成一致,但需要一定时间的延迟、等待。
BASE理论是对CAP中AP的一个扩展,通过牺牲强一致性来获得可用性,当出现故障允许部分不可用但要保证 核心功能可用,允许数据在一段时间内是不一致的,但最终达到一致状态。满足BASE理论的事务,我们称之为“柔性事务”。
1.4 柔性事务方案
TCC(两阶段型、补偿型) 案例:(定婚-领证)
可靠消息最终一致性(异步确保型)消息队列来保证事务的一致性
非事务型消息中间件(activimq\rabbitmq\kafka)
事务性消息rocketmq(阿里的消息队列) - 案
例:(在线下棋)
3)最大努力通知(非可靠消息、定期校对) - 案例:滴滴打车有未付款的订单,每个一段时间滴滴都会通知我们付款. 滴滴最大努力通知用户;用户定期校验.如果未付款,则执行付款,付款后 滴滴不再重复通知.数据状态保持一致.
二、数据一致性
2.1 两阶段提交协议(2PC)
两阶段提交由协调者和参与者组成,共经过两个阶段和三个操作,部分关系数据库如Oracle、MySQL支持两阶段提交协议。
1)第一阶段:准备阶段(prepare)
协调者通知参与者准备提交订单,参与者开始投票。
协调者完成准备工作向协调者回应Yes。
2)第二阶段:提交(commit)/回滚(rollback)阶段
协调者根据参与者的投票结果发起最终的提交指令。
如果有参与者没有准备好则发起回滚指令。
流程:
1、应用程序连接两个数据源。
2、应用程序通过事务协调器向两个库发起prepare,两个数据库收到消息分别执行本地事务(记录日志),但不提交,如果执行成功则回复yes,否则回复no。
3、事务协调器收到回复,只要有一方回复no则分别向参与者发起回滚事务,参与者开始回滚事务。
4、事务协调器收到回复,全部回复yes,此时向参与者发起提交事务。如果参与者有一方提交事务失败则由事务协调器发起回滚事务。
优缺点
优点:实现强一致性,部分关系数据库支持(Oracle、MySQL等)。
缺点:整个事务的执行需要由协调者在多个节点之间去协调,增加了事务的执行时间,性能低下。 解决方案有:springboot+Atomikos or Bitronix
XA方案
2PC的传统方案是在数据库层面实现的,如Oracle、MySQL都支持2PC协议,为了统一标准减少行业内不必要的对 接成本,需要制定标准化的处理模型及接口标准,国际开放标准组织Open Group定义了分布式事务处理模型 DTP(Distributed Transaction Processing Reference Model)。
以新用户注册送积分为例:
执行流程如下:
1、应用程序(AP)持有用户库和积分库两个数据源。
2、应用程序(AP)通过TM通知用户库RM新增用户,同时通知积分库RM为该用户新增积分,RM此时并未提交事 务,此时用户和积分资源锁定。
3、TM收到执行回复,只要有一方失败则分别向其他RM发起回滚事务,回滚完毕,资源锁释放。
4、TM收到执行回复,全部成功,此时向所有RM发起提交事务,提交完毕,资源锁释放。
DTP模型定义如下角色:
AP(Application Program):即应用程序,可以理解为使用DTP分布式事务的程序。
RM(Resource Manager):即资源管理器,可以理解为事务的参与者,一般情况下是指一个数据库实例,通过 资源管理器对该数据库进行控制,资源管理器控制着分支事务。
TM(Transaction Manager):事务管理器,负责协调和管理事务,事务管理器控制着全局事务,管理事务生命 周期,并协调各个RM。全局事务是指分布式事务处理环境中,需要操作多个数据库共同完成一个工作,这个 工作即是一个全局事务。
DTP模型定义TM和RM之间通讯的接口规范叫XA,简单理解为数据库提供的2PC接口协议,基于数据库的XA 协议来实现2PC又称为XA方案。
以上三个角色之间的交互方式如下:
1)TM向AP提供 应用程序编程接口,AP通过TM提交及回滚事务。
2)TM交易中间件通过XA接口来通知RM数据库事务的开始、结束以及提交、回滚等。
总结:
整个2PC的事务流程涉及到三个角色AP、RM、TM。AP指的是使用2PC分布式事务的应用程序;RM指的是资 源管理器,它控制着分支事务;TM指的是事务管理器,它控制着整个全局事务。
1)在准备阶段RM执行实际的业务操作,但不提交事务,资源锁定;
2)在提交阶段TM会接受RM在准备阶段的执行回复,只要有任一个RM执行失败,TM会通知所有RM执行回滚操 作,否则,TM将会通知所有RM提交该事务。提交阶段结束资源锁释放。
XA方案的问题:
1、需要本地数据库支持XA协议。
2、资源锁需要等到两个阶段结束才释放,性能较差。
Seata方案
Seata是由阿里中间件团队发起的开源项目 Fescar,后更名为Seata,它是一个是开源的分布式事务框架。
传统2PC的问题在Seata中得到了解决,它通过对本地关系数据库的分支事务的协调来驱动完成全局事务,是工作 在应用层的中间件。主要优点是性能较好,且不长时间占用连接资源,它以高效并且对业务0侵入的方式解决微服 务场景下面临的分布式事务问题,它目前提供AT模式(即2PC)及TCC模式的分布式事务解决方案。
Seata的设计思想如下:
Seata的设计目标其一是对业务无侵入,因此从业务无侵入的2PC方案着手,在传统2PC的基础上演进,并解决 2PC方案面临的问题。
Seata把一个分布式事务理解成一个包含了若干分支事务的全局事务。全局事务的职责是协调其下管辖的分支事务 达成一致,要么一起成功提交,要么一起失败回滚。此外,通常分支事务本身就是一个关系数据库的本地事务
与 传统2PC 的模型类似,Seata定义了3个组件来协议分布式事务的处理过程:
Transaction Coordinator (TC): 事务协调器,它是独立的中间件,需要独立部署运行,它维护全局事务的运 行状态,接收TM指令发起全局事务的提交与回滚,负责与RM通信协调各各分支事务的提交或回滚。
Transaction Manager (TM): 事务管理器,TM需要嵌入应用程序中工作,它负责开启一个全局事务,并最终 向TC发起全局提交或全局回滚的指令。
Resource Manager (RM): 控制分支事务,负责分支注册、状态汇报,并接收事务协调器TC的指令,驱动分 支(本地)事务的提交和回滚。
还拿新用户注册送积分举例Seata的分布式事务过程:
具体的执行流程如下:
- 用户服务的 TM 向 TC 申请开启一个全局事务,全局事务创建成功并生成一个全局唯一的XID。
- 用户服务的 RM 向 TC 注册 分支事务,该分支事务在用户服务执行新增用户逻辑,并将其纳入 XID 对应全局 事务的管辖。
- 用户服务执行分支事务,向用户表插入一条记录。
- 逻辑执行到远程调用积分服务时(XID 在微服务调用链路的上下文中传播)。积分服务的RM 向 TC 注册分支事 务,该分支事务执行增加积分的逻辑,并将其纳入 XID 对应全局事务的管辖。
- 积分服务执行分支事务,向积分记录表插入一条记录,执行完毕后,返回用户服务。
- 用户服务分支事务执行完毕。
- TM 向 TC 发起针对 XID 的全局提交或回滚决议。
- TC 调度 XID 下管辖的全部分支事务完成提交或回滚请求
Seata实现2PC与传统2PC的差别:
架构层次方面,传统2PC方案的 RM 实际上是在数据库层,RM 本质上就是数据库自身,通过 XA 协议实现,而 Seata的 RM 是以jar包的形式作为中间件层部署在应用程序这一侧的。
两阶段提交方面,传统2PC无论第二阶段的决议是commit还是rollback,事务性资源的锁都要保持到Phase2完成 才释放。而Seata的做法是在Phase1 就将本地事务提交,这样就可以省去Phase2持锁的时间,整体提高效率。
seata实现2PC事务
Seata执行流程
1、每个RM使用DataSourceProxy连接数据库,其目的是使用ConnectionProxy,使用数据源和数据连接代理的目 的就是在第一阶段将undo_log和业务数据放在一个本地事务提交,这样就保存了只要有业务操作就一定有 undo_log。
2、在第一阶段undo_log中存放了数据修改前和修改后的值,为事务回滚作好准备,所以第一阶段完成就已经将分 支事务提交,也就释放了锁资源。
3、TM开启全局事务开始,将XID全局事务id放在事务上下文中,通过feign调用也将XID传入下游分支事务,每个 分支事务将自己的Branch ID分支事务ID与XID关联。
4、第二阶段全局事务提交,TC会通知各各分支参与者提交分支事务,在第一阶段就已经提交了分支事务,这里各 各参与者只需要删除undo_log即可,并且可以异步执行,第二阶段很快可以完成。
5、第二阶段全局事务回滚,TC会通知各各分支参与者回滚分支事务,通过 XID 和 Branch ID 找到相应的回滚日 志,通过回滚日志生成反向的 SQL 并执行,以完成分支事务回滚到之前的状态,如果回滚失败则会重试回滚操 作。
Seata实现2PC要点: 1、全局事务开始使用 @GlobalTransactional标识 。 2、每个本地事务方案仍然使用@Transactional标识。 3、每个数据都需要创建undo_log表,此表是seata保证本地事务一致性的关键。
2.2 事务补偿TCC
TCC事务补偿是基于2PC实现的业务层事务控制方案,它是Try、Confirm和Cancel三个单词的首字母,含义如下:
1、Try 检查及预留业务资源
完成提交事务前的检查,并预留好资源。这个过程并未执行业务,只是完成所有业务的一致性检查,并预留好执行所需的全部资源
2、Confirm 确定执行业务操作
对try阶段预留的资源正式执行。确认执行业务操作,不做任何业务检查, 只使用Try阶段预留的业务资源。通常情况下,采用TCC则认为 Confifirm阶段是不会出错的。即:只要Try成功,Confifirm一定成功。若Confifirm阶段真的出错了,需引入重试机制或人工处理。
3、Cancel 取消执行业务操作
对try阶段预留的资源释放。取消Try阶段预留的业务资源。通常情况下,采用TCC则认为Cancel阶段也是一定成功的。若Cancel阶段真的出错了,需引入重试机制或人工处理
TM事务管理器
TM事务管理器可以实现为独立的服务,也可以让全局事务发起方充当TM的角色,TM独立出来是为了成为公 用组件,是为了考虑系统结构和软件复用。 TM在发起全局事务时生成全局事务记录,全局事务ID贯穿整个分布式事务调用链条,用来记录事务上下文, 追踪和记录状态,由于Confirm 和cancel失败需进行重试,因此需要实现为幂等,幂等性是指同一个操作无论请求 多少次,其结果都相同。
流程举例:
1、Try
下单业务由订单服务和库存服务协同完成,在try阶段订单服务和库存服务完成检查和预留资源。
订单服务检查当前是否满足提交订单的条件(比如:当前存在未完成订单的不允许提交新订单)。
库存服务检查当前是否有充足的库存,并锁定资源。
2、Confirm
订单服务和库存服务成功完成Try后开始正式执行资源操作。
订单服务向订单写一条订单信息。
库存服务减去库存。
3、Cancel
如果订单服务和库存服务有一方出现失败则全部取消操作。
订单服务需要删除新增的订单信息。
库存服务将减去的库存再还原。
优缺点
优点:最终保证数据的一致性,在业务层实现事务控制,灵活性好。
缺点:开发成本高,每个事务操作每个参与者都需要实现try/confirm/cancel三个接口。
注意:TCC的try/confirm/cancel接口都要实现幂等性,在为在try、confirm、cancel失败后要不断重试。
什么是幂等性?
幂等性是指同一个操作无论请求多少次,其结果都相同。
幂等操作实现方式有:
1、操作之前在业务方法进行判断如果执行过了就不再执行。
2、缓存所有请求和处理的结果,已经处理的请求则直接返回结果。
3、在数据库表中加一个状态字段(未处理,已处理),数据操作时判断未处理时再处理。
解决方案
目前市面上的TCC框架众多比如下面这几种:
Seata也支持TCC,但Seata的TCC模式对Spring Cloud并没有提供支持。我们的目标是理解TCC的原 理以及事务协调运作的过程,因此更请倾向于轻量级易于理解的框架,因此最终确定了Hmily。 Hmily是一个高性能分布式事务TCC开源框架。基于Java语言来开发(JDK1.8),支持Dubbo,Spring Cloud等 RPC框架进行分布式事务。它目前支持以下特性:
- 支持嵌套事务(Nested transaction support).
- 采用disruptor框架进行事务日志的异步读写,与RPC框架的性能毫无差别。
- 支持SpringBoot-starter 项目启动,使用简单。
- RPC框架支持 : dubbo,motan,springcloud。
- 本地事务存储支持 : redis,mongodb,zookeeper,file,mysql。
- 事务日志序列化支持 :java,hessian,kryo,protostuff。
- 采用Aspect AOP 切面思想与Spring无缝集成,天然支持集群。
- RPC事务恢复,超时异常恢复等
Hmily利用AOP对参与分布式事务的本地方法与远程方法进行拦截处理,通过多方拦截,事务参与者能透明的 调用到另一方的Try、Confirm、Cancel方法;传递事务上下文;并记录事务日志,酌情进行补偿,重试等。
Hmily不需要事务协调服务,但需要提供一个数据库(mysql/mongodb/zookeeper/redis/file)来进行日志存 储。
Hmily实现的TCC服务与普通的服务一样,只需要暴露一个接口,也就是它的Try业务。Confirm/Cancel业务 逻辑,只是因为全局事务提交/回滚的需要才提供的,因此Confirm/Cancel业务只需要被Hmily TCC事务框架 发现即可,不需要被调用它的其他业务服务所感知。
官网介绍:https://dromara.org/website/zh-cn/docs/hmily/index.html
TCC需要注意三种异常处理分别是空回滚、幂等、悬挂:
空回滚:
在没有调用 TCC 资源 Try 方法的情况下,调用了二阶段的 Cancel 方法,Cancel 方法需要识别出这是一个空回 滚,然后直接返回成功。
出现原因是当一个分支事务所在服务宕机或网络异常,分支事务调用记录为失败,这个时候其实是没有执行Try阶 段,当故障恢复后,分布式事务进行回滚则会调用二阶段的Cancel方法,从而形成空回滚。
解决思路是关键就是要识别出这个空回滚。思路很简单就是需要知道一阶段是否执行,如果执行了,那就是正常回 滚;如果没执行,那就是空回滚。前面已经说过TM在发起全局事务时生成全局事务记录,全局事务ID贯穿整个分 布式事务调用链条。再额外增加一张分支事务记录表,其中有全局事务 ID 和分支事务 ID,第一阶段 Try 方法里会 插入一条记录,表示一阶段执行了。Cancel 接口里读取该记录,如果该记录存在,则正常回滚;如果该记录不存 在,则是空回滚。
幂等:
通过前面介绍已经了解到,为了保证TCC二阶段提交重试机制不会引发数据不一致,要求 TCC 的二阶段 Try、 Confirm 和 Cancel 接口保证幂等,这样不会重复使用或者释放资源。如果幂等控制没有做好,很有可能导致数据 不一致等严重问题。
解决思路在上述“分支事务记录”中增加执行状态,每次执行前都查询该状态。
悬挂:
悬挂就是对于一个分布式事务,其二阶段 Cancel 接口比 Try 接口先执行。
出现原因是在 RPC 调用分支事务try时,先注册分支事务,再执行RPC调用,如果此时 RPC 调用的网络发生拥堵, 通常 RPC 调用是有超时时间的,RPC 超时以后,TM就会通知RM回滚该分布式事务,可能回滚完成后,RPC 请求 才到达参与者真正执行,而一个 Try 方法预留的业务资源,只有该分布式事务才能使用,该分布式事务第一阶段预 留的业务资源就再也没有人能够处理了,对于这种情况,我们就称为悬挂,即业务资源预留后没法继续处理。
解决思路是如果二阶段执行完成,那一阶段就不能再继续执行。在执行一阶段事务时判断在该全局事务下,“分支 事务记录”表中是否已经有二阶段事务记录,如果有则不执行Try。
Hmily实现TCC事务
依赖
<dependency>
<groupId>org.dromara</groupId>
<artifactId>hmily‐springcloud</artifactId>
<version>2.0.4‐RELEASE</version>
</dependency>配置hmily
server:
port: 56081
eureka:
client:
serviceUrl:
defaultZone: http://localhost:56080/eureka/
spring:
##################### DB #####################
datasource:
ds0:
url: jdbc:mysql://localhost:3306/bank1?useUnicode=true
username: root
password: mysql
type: com.alibaba.druid.pool.DruidDataSource
driver-class-name: com.mysql.jdbc.Driver
initialSize: 5
minIdle: 5
maxActive: 20
maxWait: 60000
timeBetweenEvictionRunsMillis: 60000
minEvictableIdleTimeMillis: 300000
validationQuery: SELECT user()
testWhileIdle: true
testOnBorrow: false
testOnReturn: false
poolPreparedStatements: true
connection-properties: druid.stat.mergeSql:true;druid.stat.slowSqlMillis:5000
org:
dromara:
hmily :
serializer : kryo
recoverDelayTime : 30
retryMax : 30
scheduledDelay : 30
scheduledThreadMax : 10
repositorySupport : db
started: true
hmilyDbConfig :
driverClassName : com.mysql.jdbc.Driver
url : jdbc:mysql://localhost:3306/hmily?useUnicode=true
username : root
password : mysql
logging:
level:
root: info
org.springframework.web: info
org.apache.ibatis: info
org.dromara.hmily.bonuspoint: debug
org.dromara.hmily.lottery: debug
org.dromara.hmily: debug
io.netty: info
cn.itcast.wanxintx.seatademo.bank2: debug并创建HmilyTransactionBootstrap Bean
@Bean
public HmilyTransactionBootstrap hmilyTransactionBootstrap(HmilyInitService hmilyInitService){
HmilyTransactionBootstrap hmilyTransactionBootstrap = new HmilyTransactionBootstrap(hmilyInitService);
hmilyTransactionBootstrap.setSerializer(env.getProperty("org.dromara.hmily.serializer"));
hmilyTransactionBootstrap.setRecoverDelayTime(Integer.parseInt(env.getProperty("org.dromara.hmi ly.recoverDelayTime")));
hmilyTransactionBootstrap.setRetryMax(Integer.parseInt(env.getProperty("org.dromara.hmily.retry Max")));
hmilyTransactionBootstrap.setScheduledDelay(Integer.parseInt(env.getProperty("org.dromara.hmily .scheduledDelay")));
hmilyTransactionBootstrap.setScheduledThreadMax(Integer.parseInt(env.getProperty("org.dromara.h mily.scheduledThreadMax")));
hmilyTransactionBootstrap.setRepositorySupport(env.getProperty("org.dromara.hmily.repositorySup port"));
hmilyTransactionBootstrap.setStarted(Boolean.parseBoolean(env.getProperty("org.dromara.hmily.st arted")));
HmilyDbConfig hmilyDbConfig = new HmilyDbConfig();
hmilyDbConfig.setDriverClassName(env.getProperty("org.dromara.hmily.hmilyDbConfig.driverClassNa me"));
hmilyDbConfig.setUrl(env.getProperty("org.dromara.hmily.hmilyDbConfig.url"));
hmilyDbConfig.setUsername(env.getProperty("org.dromara.hmily.hmilyDbConfig.username"));
hmilyDbConfig.setPassword(env.getProperty("org.dromara.hmily.hmilyDbConfig.password"));
hmilyTransactionBootstrap.setHmilyDbConfig(hmilyDbConfig);
return hmilyTransactionBootstrap;
}启动类增加@EnableAspectJAutoProxy并增加org.dromara.hmily的扫描项
@SpringBootApplication
@EnableDiscoveryClient
@EnableHystrix
@EnableFeignClients(basePackages = {"cn.itcast.dtx.tccdemo.bank1.spring"})
@ComponentScan({"cn.itcast.dtx.tccdemo.bank1","org.dromara.hmily"})
public class Bank1HmilyServer {
public static void main(String[] args) {
SpringApplication.run(Bank1HmilyServer.class, args);
}
}try和catch
package cn.itcast.dtx.tccdemo.bank1.service.impl;
import cn.itcast.dtx.tccdemo.bank1.dao.AccountInfoDao;
import cn.itcast.dtx.tccdemo.bank1.service.AccountInfoService;
import cn.itcast.dtx.tccdemo.bank1.spring.Bank2Client;
import lombok.extern.slf4j.Slf4j;
import org.dromara.hmily.annotation.Hmily;
import org.dromara.hmily.core.concurrent.threadlocal.HmilyTransactionContextLocal;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
/**
* @author Administrator
* @version 1.0
**/
@Service
@Slf4j
public class AccountInfoServiceImpl implements AccountInfoService {
@Autowired
AccountInfoDao accountInfoDao;
@Autowired
Bank2Client bank2Client;
// 账户扣款,就是tcc的try方法
/**
* try幂等校验
* try悬挂处理
* 检查余额是够扣减金额
* 扣减金额
* @param accountNo
* @param amount
*/
@Override
@Transactional
//只要标记@Hmily就是try方法,在注解中指定confirm、cancel两个方法的名字
@Hmily(confirmMethod="commit",cancelMethod="rollback")
public void updateAccountBalance(String accountNo, Double amount) {
//获取全局事务id
String transId = HmilyTransactionContextLocal.getInstance().get().getTransId();
log.info("bank1 try begin 开始执行...xid:{}",transId);
//幂等判断 判断local_try_log表中是否有try日志记录,如果有则不再执行
if(accountInfoDao.isExistTry(transId)>0){
log.info("bank1 try 已经执行,无需重复执行,xid:{}",transId);
return ;
}
//try悬挂处理,如果cancel、confirm有一个已经执行了,try不再执行
if(accountInfoDao.isExistConfirm(transId)>0 || accountInfoDao.isExistCancel(transId)>0){
log.info("bank1 try悬挂处理 cancel或confirm已经执行,不允许执行try,xid:{}",transId);
return ;
}
//扣减金额
if(accountInfoDao.subtractAccountBalance(accountNo, amount)<=0){
//扣减失败
throw new RuntimeException("bank1 try 扣减金额失败,xid:{}"+transId);
}
//插入try执行记录,用于幂等判断
accountInfoDao.addTry(transId);
//远程调用李四,转账
if(!bank2Client.transfer(amount)){
throw new RuntimeException("bank1 远程调用李四微服务失败,xid:{}"+transId);
}
if(amount == 2){
throw new RuntimeException("人为制造异常,xid:{}"+transId);
}
log.info("bank1 try end 结束执行...xid:{}",transId);
}
//confirm方法
@Transactional
public void commit(String accountNo, Double amount){
//获取全局事务id
String transId = HmilyTransactionContextLocal.getInstance().get().getTransId();
log.info("bank1 confirm begin 开始执行...xid:{},accountNo:{},amount:{}",transId,accountNo,amount);
}
/** cancel方法
* cancel幂等校验
* cancel空回滚处理
* 增加可用余额
* @param accountNo
* @param amount
*/
@Transactional
public void rollback(String accountNo, Double amount){
//获取全局事务id
String transId = HmilyTransactionContextLocal.getInstance().get().getTransId();
log.info("bank1 cancel begin 开始执行...xid:{}",transId);
// cancel幂等校验
if(accountInfoDao.isExistCancel(transId)>0){
log.info("bank1 cancel 已经执行,无需重复执行,xid:{}",transId);
return ;
}
//cancel空回滚处理,如果try没有执行,cancel不允许执行
if(accountInfoDao.isExistTry(transId)<=0){
log.info("bank1 空回滚处理,try没有执行,不允许cancel执行,xid:{}",transId);
return ;
}
// 增加可用余额
accountInfoDao.addAccountBalance(accountNo,amount);
//插入一条cancel的执行记录
accountInfoDao.addCancel(transId);
log.info("bank1 cancel end 结束执行...xid:{}",transId);
}
}feignClient
package cn.itcast.dtx.tccdemo.bank1.spring;
import org.dromara.hmily.annotation.Hmily;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
/**
* Created by Administrator.
*/
@FeignClient(value="tcc-demo-bank2",fallback=Bank2ClientFallback.class)
public interface Bank2Client {
//远程调用李四的微服务
@GetMapping("/bank2/transfer")
@Hmily
public Boolean transfer(@RequestParam("amount") Double amount);
}服务2实现confirm方法
package cn.itcast.dtx.tccdemo.bank2.service.impl;
import cn.itcast.dtx.tccdemo.bank2.dao.AccountInfoDao;
import cn.itcast.dtx.tccdemo.bank2.service.AccountInfoService;
import lombok.extern.slf4j.Slf4j;
import org.dromara.hmily.annotation.Hmily;
import org.dromara.hmily.core.concurrent.threadlocal.HmilyTransactionContextLocal;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
/**
* @author Administrator
* @version 1.0
**/
@Service
@Slf4j
public class AccountInfoServiceImpl implements AccountInfoService {
@Autowired
AccountInfoDao accountInfoDao;
@Override
@Hmily(confirmMethod="confirmMethod", cancelMethod="cancelMethod")
public void updateAccountBalance(String accountNo, Double amount) {
//获取全局事务id
String transId = HmilyTransactionContextLocal.getInstance().get().getTransId();
log.info("bank2 try begin 开始执行...xid:{}",transId);
}
/**
* confirm方法
* confirm幂等校验
* 正式增加金额
* @param accountNo
* @param amount
*/
@Transactional
public void confirmMethod(String accountNo, Double amount){
//获取全局事务id
String transId = HmilyTransactionContextLocal.getInstance().get().getTransId();
log.info("bank2 confirm begin 开始执行...xid:{}",transId);
if(accountInfoDao.isExistConfirm(transId)>0){
log.info("bank2 confirm 已经执行,无需重复执行...xid:{}",transId);
return ;
}
//增加金额
accountInfoDao.addAccountBalance(accountNo,amount);
//增加一条confirm日志,用于幂等
accountInfoDao.addConfirm(transId);
log.info("bank2 confirm end 结束执行...xid:{}",transId);
}
/**
* @param accountNo
* @param amount
*/
public void cancelMethod(String accountNo, Double amount){
//获取全局事务id
String transId = HmilyTransactionContextLocal.getInstance().get().getTransId();
log.info("bank2 cancel begin 开始执行...xid:{}",transId);
}
}如果拿TCC事务的处理流程与2PC两阶段提交做比较,2PC通常都是在跨库的DB层面,而TCC则在应用层面的处 理,需要通过业务逻辑来实现。这种分布式事务的实现方式的优势在于,可以让应用自己定义数据操作的粒度,使得降低锁冲突、提高吞吐量成为可能。
而不足之处则在于对应用的侵入性非常强,业务逻辑的每个分支都需要实现try、confifirm、cancel三个操作。此 外,其实现难度也比较大,需要按照网络状态、系统故障等不同的失败原因实现不同的回滚策略。
2.3 消息队列实现最终一致
可靠消息最终一致性方案是指当事务发起方执行完成本地事务后并发出一条消息,事务参与方(消息消费者)一定能 够接收消息并处理事务成功,此方案强调的是只要消息发给事务参与方最终事务要达到一致。 此方案是利用消息中间件完成,如下图:
事务发起方(消息生产方)将消息发给消息中间件,事务参与方从消息中间件接收消息,事务发起方和消息中间件 之间,事务参与方(消息消费方)和消息中间件之间都是通过网络通信,由于网络通信的不确定性会导致分布式事务问题
可靠消息最终一致性方案要解决以下几个问题:
本地事务与消息发送的原子性问题
本地事务与消息发送的原子性问题即:事务发起方在本地事务执行成功后消息必须发出去,否则就丢弃消息。即实 现本地事务和消息发送的原子性,要么都成功,要么都失败。本地事务与消息发送的原子性问题是实现可靠消息最 终一致性方案的关键问题。
事务参与方接收消息的可靠性
事务参与方必须能够从消息队列接收到消息,如果接收消息失败可以重复接收消息。
消息重复消费的问题
由于网络2的存在,若某一个消费节点超时但是消费成功,此时消息中间件会重复投递此消息,就导致了消息的重 复消费。
要解决消息重复消费的问题就要实现事务参与方的方法幂等性。
解决方案
本地消息表方案
本地消息表这个方案最初是eBay提出的,此方案的核心是通过本地事务保证数据业务操作和消息的一致性,然后 通过定时任务将消息发送至消息中间件,待确认消息发送给消费方成功再将消息删除。
定时任务扫描日志
如何保证将消息发送给消息队列呢?
经过第一步消息已经写到消息日志表中,可以启动独立的线程,定时对消息日志表中的消息进行扫描并发送至消息 中间件,在消息中间件反馈发送成功后删除该消息日志,否则等待定时任务下一周期重试。
消费消息
如何保证消费者一定能消费到消息呢? 这里可以使用MQ的ack(即消息确认)机制,消费者监听MQ,如果消费者接收到消息并且业务处理完成后向MQ 发送ack(即消息确认),此时说明消费者正常消费消息完成,MQ将不再向消费者推送消息,否则消费者会不断重 试向消费者来发送消息
实现步骤
第一步: 消息由系统A投递到中间件
在系统A处理任务A前,首先向消息中间件发送一条消息
消息中间件收到后将该条消息持久化,但并不投递。持久化成功后,向A回复一个确认应答
系统A收到确认应答后,则可以开始处理任务A 4. 任务A处理完成后,向消息中间件发送Commit或者Rollback请求。该请求发送完成后,对系统A而 言,该事务的处理过程就结束了
如果消息中间件收到Commit,则向B系统投递消息;如果收到Rollback,则直接丢弃消息。但是 如果消息中间件收不到Commit和Rollback指令,那么就要依靠"超时询问机制"。
超时询问机制 系统A除了实现正常的业务流程外,还需提供一个事务询问的接口,供消息中间件调用。当消息中间件收到发布消息便开始计时,如果到了超时没收到确认指令,就会主动调用系统A提供的事务询问接口询问该系统目前的状态。该接口会返回三种结果,中间件根据三种结果做出不同反应: 提交:将该消息投递给系统B 回滚:直接将条消息丢弃 处理中:继续等待
第二步: 消息由中间件投递到系统B 消息中间件向下游系统投递完消息后便进入阻塞等待状态,下游系统便立即进行任务的处理,任务处理完成后便向消息中间件返回应答。 如果消息中间件收到确认应答后便认为该事务处理完毕 如果消息中间件在等待确认应答超时之后就会重新投递,直到下游消费者返回消费成功响应为止。 一般消息中间件可以设置消息重试的次数和时间间隔,如果最终还是不能成功投递,则需要手工干预。这里之所以使用人工干预,而不是使用让A系统回滚,主要是考虑到整个系统设计的复杂度问题。
RocketMQ事务消息方案
RocketMQ 事务消息设计则主要是为了解决 Producer 端的消息发送与本地事务执行的原子性问题,RocketMQ 的 设计中 broker 与 producer 端的双向通信能力,使得 broker 天生可以作为一个事务协调者存在;而 RocketMQ 本身提供的存储机制为事务消息提供了持久化能力;RocketMQ 的高可用机制以及可靠消息设计则为事务消息在系 统发生异常时依然能够保证达成事务的最终一致性。
在RocketMQ 4.3后实现了完整的事务消息,实际上其实是对本地消息表的一个封装,将本地消息表移动到了MQ 内部,解决 Producer 端的消息发送与本地事务执行的原子性问题
执行流程如下:
为方便理解我们还以注册送积分的例子来描述 整个流程。
Producer 即MQ发送方,本例中是用户服务,负责新增用户。MQ订阅方即消息消费方,本例中是积分服务,负责 新增积分。
1、Producer 发送事务消息
Producer (MQ发送方)发送事务消息至MQ Server,MQ Server将消息状态标记为Prepared(预备状态),注 意此时这条消息消费者(MQ订阅方)是无法消费到的。
本例中,Producer 发送 ”增加积分消息“ 到MQ Server。
2、MQ Server回应消息发送成功
MQ Server接收到Producer 发送给的消息则回应发送成功表示MQ已接收到消息。
3、Producer 执行本地事务
Producer 端执行业务代码逻辑,通过本地数据库事务控制。
本例中,Producer 执行添加用户操作。
4、消息投递
若Producer 本地事务执行成功则自动向MQServer发送commit消息,MQ Server接收到commit消息后将”增加积 分消息“ 状态标记为可消费,此时MQ订阅方(积分服务)即正常消费消息;
若Producer 本地事务执行失败则自动向MQServer发送rollback消息,MQ Server接收到rollback消息后 将删除”增加积分消息“ 。
MQ订阅方(积分服务)消费消息,消费成功则向MQ回应ack,否则将重复接收消息。这里ack默认自动回应,即程序执行正常则自动回应ack。
5、事务回查
如果执行Producer端本地事务过程中,执行端挂掉,或者超时,MQ Server将会不停的询问同组的其他 Producer 来获取事务执行状态,这个过程叫事务回查。MQ Server会根据事务回查结果来决定是否投递消息。
以上主干流程已由RocketMQ实现,对用户侧来说,用户需要分别实现本地事务执行以及本地事务回查方法,因此 只需关注本地事务的执行状态即可。
RoacketMQ提供RocketMQLocalTransactionListener接口:
public interface RocketMQLocalTransactionListener {
/**
‐ 发送prepare消息成功此方法被回调,该方法用于执行本地事务
‐ @param msg 回传的消息,利用transactionId即可获取到该消息的唯一Id
‐ @param arg 调用send方法时传递的参数,当send时候若有额外的参数可以传递到send方法中,这里能获取到
‐ @return 返回事务状态,COMMIT:提交 ROLLBACK:回滚 UNKNOW:回调
*/
RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg);
/**
‐ @param msg 通过获取transactionId来判断这条消息的本地事务执行状态
‐ @return 返回事务状态,COMMIT:提交 ROLLBACK:回滚 UNKNOW:回调
*/
RocketMQLocalTransactionState checkLocalTransaction(Message msg);
}发送事务消息: 以下是RocketMQ提供用于发送事务消息的API:
TransactionMQProducer producer = new TransactionMQProducer("ProducerGroup");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start(); //设置TransactionListener实现
producer.setTransactionListener(transactionListener);
//发送事务消息
SendResult sendResult = producer.sendMessageInTransaction(msg, null);RocketMQ实现可靠消息最终一致性事务
依赖
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.2</version>
</dependency>配置
rocketmq.producer.group = producer_bank2
rocketmq.name‐server = 127.0.0.1:9876业务
package cn.itcast.dtx.txmsgdemo.bank1.service.impl;
import cn.itcast.dtx.txmsgdemo.bank1.dao.AccountInfoDao;
import cn.itcast.dtx.txmsgdemo.bank1.model.AccountChangeEvent;
import cn.itcast.dtx.txmsgdemo.bank1.service.AccountInfoService;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
/**
* @author Administrator
* @version 1.0
**/
@Service
@Slf4j
public class AccountInfoServiceImpl implements AccountInfoService {
@Autowired
AccountInfoDao accountInfoDao;
@Autowired
RocketMQTemplate rocketMQTemplate;
//向mq发送转账消息
@Override
public void sendUpdateAccountBalance(AccountChangeEvent accountChangeEvent) {
//将accountChangeEvent转成json
JSONObject jsonObject =new JSONObject();
jsonObject.put("accountChange",accountChangeEvent);
String jsonString = jsonObject.toJSONString();
//生成message类型
Message<String> message = MessageBuilder.withPayload(jsonString).build();
//发送一条事务消息
/**
* String txProducerGroup 生产组
* String destination topic,
* Message<?> message, 消息内容
* Object arg 参数
*/
rocketMQTemplate.sendMessageInTransaction("producer_group_txmsg_bank1","topic_txmsg",message,null);
}
//更新账户,扣减金额
@Override
@Transactional
public void doUpdateAccountBalance(AccountChangeEvent accountChangeEvent) {
//幂等判断
if(accountInfoDao.isExistTx(accountChangeEvent.getTxNo())>0){
return ;
}
//扣减金额
accountInfoDao.updateAccountBalance(accountChangeEvent.getAccountNo(),accountChangeEvent.getAmount() * -1);
//添加事务日志
accountInfoDao.addTx(accountChangeEvent.getTxNo());
if(accountChangeEvent.getAmount() == 3){
throw new RuntimeException("人为制造异常");
}
}
}本地事务
RocketMQLocalTransactionListener
package cn.itcast.dtx.txmsgdemo.bank1.message;
import cn.itcast.dtx.txmsgdemo.bank1.dao.AccountInfoDao;
import cn.itcast.dtx.txmsgdemo.bank1.model.AccountChangeEvent;
import cn.itcast.dtx.txmsgdemo.bank1.service.AccountInfoService;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
/**
* @author Administrator
* @version 1.0
**/
@Component
@Slf4j
@RocketMQTransactionListener(txProducerGroup = "producer_group_txmsg_bank1")
public class ProducerTxmsgListener implements RocketMQLocalTransactionListener {
@Autowired
AccountInfoService accountInfoService;
@Autowired
AccountInfoDao accountInfoDao;
//事务消息发送后的回调方法,当消息发送给mq成功,此方法被回调
@Override
@Transactional
public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
try {
//解析message,转成AccountChangeEvent
String messageString = new String((byte[]) message.getPayload());
JSONObject jsonObject = JSONObject.parseObject(messageString);
String accountChangeString = jsonObject.getString("accountChange");
//将accountChange(json)转成AccountChangeEvent
AccountChangeEvent accountChangeEvent = JSONObject.parseObject(accountChangeString, AccountChangeEvent.class);
//执行本地事务,扣减金额
accountInfoService.doUpdateAccountBalance(accountChangeEvent);
//当返回RocketMQLocalTransactionState.COMMIT,自动向mq发送commit消息,mq将消息的状态改为可消费
return RocketMQLocalTransactionState.COMMIT;
} catch (Exception e) {
e.printStackTrace();
return RocketMQLocalTransactionState.ROLLBACK;
}
}
//事务状态回查,查询是否扣减金额
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
//解析message,转成AccountChangeEvent
String messageString = new String((byte[]) message.getPayload());
JSONObject jsonObject = JSONObject.parseObject(messageString);
String accountChangeString = jsonObject.getString("accountChange");
//将accountChange(json)转成AccountChangeEvent
AccountChangeEvent accountChangeEvent = JSONObject.parseObject(accountChangeString, AccountChangeEvent.class);
//事务id
String txNo = accountChangeEvent.getTxNo();
int existTx = accountInfoDao.isExistTx(txNo);
if(existTx>0){
return RocketMQLocalTransactionState.COMMIT;
}else{
return RocketMQLocalTransactionState.UNKNOWN;
}
}
}服务2接收消息
package cn.itcast.dtx.txmsgdemo.bank2.service.impl;
import cn.itcast.dtx.txmsgdemo.bank2.dao.AccountInfoDao;
import cn.itcast.dtx.txmsgdemo.bank2.model.AccountChangeEvent;
import cn.itcast.dtx.txmsgdemo.bank2.service.AccountInfoService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
/**
* @author Administrator
* @version 1.0
**/
@Service
@Slf4j
public class AccountInfoServiceImpl implements AccountInfoService {
@Autowired
AccountInfoDao accountInfoDao;
//更新账户,增加金额
@Override
@Transactional
public void addAccountInfoBalance(AccountChangeEvent accountChangeEvent) {
log.info("bank2更新本地账号,账号:{},金额:{}",accountChangeEvent.getAccountNo(),accountChangeEvent.getAmount());
if(accountInfoDao.isExistTx(accountChangeEvent.getTxNo())>0){
return ;
}
//增加金额
accountInfoDao.updateAccountBalance(accountChangeEvent.getAccountNo(),accountChangeEvent.getAmount());
//添加事务记录,用于幂等
accountInfoDao.addTx(accountChangeEvent.getTxNo());
if(accountChangeEvent.getAmount() == 4){
throw new RuntimeException("人为制造异常");
}
}
}MQ监听类
package cn.itcast.dtx.txmsgdemo.bank2.message;
import cn.itcast.dtx.txmsgdemo.bank2.model.AccountChangeEvent;
import cn.itcast.dtx.txmsgdemo.bank2.service.AccountInfoService;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* @author Administrator
* @version 1.0
**/
@Component
@Slf4j
@RocketMQMessageListener(consumerGroup = "consumer_group_txmsg_bank2",topic = "topic_txmsg")
public class TxmsgConsumer implements RocketMQListener<String> {
@Autowired
AccountInfoService accountInfoService;
//接收消息
@Override
public void onMessage(String message) {
log.info("开始消费消息:{}",message);
//解析消息
JSONObject jsonObject = JSONObject.parseObject(message);
String accountChangeString = jsonObject.getString("accountChange");
//转成AccountChangeEvent
AccountChangeEvent accountChangeEvent = JSONObject.parseObject(accountChangeString, AccountChangeEvent.class);
//设置账号为李四的
accountChangeEvent.setAccountNo("2");
//更新本地账户,增加金额
accountInfoService.addAccountInfoBalance(accountChangeEvent);
}
}可靠消息最终一致性就是保证消息从生产方经过消息中间件传递到消费方的一致性,本案例使用了RocketMQ作为 消息中间件,RocketMQ主要解决了两个功能:
1、本地事务与消息发送的原子性问题。
2、事务参与方接收消息的可靠性。
可靠消息最终一致性事务适合执行周期长且实时性要求不高的场景。引入消息机制后,同步的事务操作变为基于消 息执行的异步操作, 避免了分布式事务中的同步阻塞操作的影响,并实现了两个服务的解耦。
流程举例
1、订单服务和库存服务完成检查和预留资源。
2、订单服务在本地事务中完成添加订单表记录和添加“减少库存任务消息”。
3、由定时任务根据消息表的记录发送给MQ通知库存服务执行减库存操作。
4、库存服务执行减少库存,并且记录执行消息状态(为避免重复执行消息,在执行减库存之前查询是否执行过此消息)。
5、库存服务向MQ发送完成减少库存的消息。
6、订单服务接收到完成库存减少的消息后删除原来添加的“减少库存任务消息”。实现最终事务一致要求:预留资源成功理论上要求正式执行成功,如果执行失败会进行重试,要求业务执行方法实现幂等。
优缺点
优点 :
由MQ按异步的方式协调完成事务,性能较高。
不用实现try/confirm/cancel接口,开发成本比TCC低。
缺点:
此方式基于关系数据库本地事务来实现,会出现频繁读写数据库记录,浪费数据库资源,另外对于高并发操作不是最佳方案。
2.4 全局事务DTP
全局事务基于DTP模型实现。DTP是由X/Open组织提出的一种分布式事务模型——X/OpenDistributed Transaction Processing Reference Model。它规定了要实现分布式事务,需要三种角色:
AP: Application 应用系统 (微服务)
TM: Transaction Manager 事务管理器 (全局事务管理):负责分配事务唯一标识,监控事务的执行进度,并负责事务的提交、回滚等。
RM: Resource Manager 资源管理器 (数据库)
CRM: 通信资源管理器 控制一个TM域(TM domain)内或者跨TM域的分布式应用之间的通信。
CP: 通信协议
整个事务分成两个阶段:
阶段一: 表决阶段,所有参与者都将本事务执行预提交,并将能否成功的信息反馈发给协调者。9
阶段二: 执行阶段,协调者根据所有参与者的反馈,通知所有参与者,步调一致地执行提交或者回滚。1
2.5 最大努力通知
最大努力通知也被称为定期校对,其实是对第二种解决方案的进一步优化。它引入了本地消息表来记录错误消息,然后加入失败消息的定期校对功能,来进一步保证消息会被下游系统消费
第一步: 消息由系统A投递到中间件
- 处理业务的同一事务中,向本地消息表中写入一条记录
- 准备专门的消息发送者不断地发送本地消息表中的消息到消息中间件,如果发送失败则重试
第二步: 消息由中间件投递到系统B 1. 消息中间件收到消息后负责将该消息同步投递给相应的下游系统,并触发下游系统的任务执行
- 当下游系统处理成功后,向消息中间件反馈确认应答,消息中间件便可以将该条消息删除,从而该事务完成
- 对于投递失败的消息,利用重试机制进行重试,对于重试失败的,写入错误消息表
- 消息中间件需要提供失败消息的查询接口,下游系统会定期查询失败消息,并将其消费 这种方式的优缺点: 优点: 一种非常经典的实现,实现了最终一致性。 缺点: 消息表会耦合到业务系统中,如果没有封装好的解决方案,会有很多杂活需要处理。
目标:发起通知方通过一定的机制最大努力将业务处理结果通知到接收方。
具体包括:
1、有一定的消息重复通知机制。
因为接收通知方可能没有接收到通知,此时要有一定的机制对消息重复通知。
2、消息校对机制。
如果尽最大努力也没有通知到接收方,或者接收方消费消息后要再次消费,此时可由接收方主动向通知方查询消息 信息来满足需求
最大努力通知与可靠消息一致性有什么不同?
1、解决方案思想不同
可靠消息一致性,发起通知方需要保证将消息发出去,并且将消息发到接收通知方,消息的可靠性关键由发起通知 方来保证。
最大努力通知,发起通知方尽最大的努力将业务处理结果通知为接收通知方,但是可能消息接收不到,此时需要接 收通知方主动调用发起通知方的接口查询业务处理结果,通知的可靠性关键在接收通知方。
2、两者的业务应用场景不同
可靠消息一致性关注的是交易过程的事务一致,以异步的方式完成交易。
最大努力通知关注的是交易后的通知事务,即将交易结果可靠的通知出去。
3、技术解决方向不同
可靠消息一致性要解决消息从发出到接收的一致性,即消息发出并且被接收到。
最大努力通知无法保证消息从发出到接收的一致性,只提供消息接收的可靠性机制。可靠机制是,最大努力的将消 息通知给接收方,当消息无法被接收方接收时,由接收方主动查询消息(业务处理结果)。
解决方案
通过对最大努力通知的理解,采用MQ的ack机制就可以实现最大努力通知。
方案1:
本方案是利用MQ的ack机制由MQ向接收通知方发送通知,流程如下:
1、发起通知方将通知发给MQ。 使用普通消息机制将通知发给MQ。 注意:如果消息没有发出去可由接收通知方主动请求发起通知方查询业务执行结果。(后边会讲)
2、接收通知方监听 MQ。
3、接收通知方接收消息,业务处理完成回应ack。
4、接收通知方若没有回应ack则MQ会重复通知。 MQ会按照间隔1min、5min、10min、30min、1h、2h、5h、10h的方式,逐步拉大通知间隔 (如果MQ采用 rocketMq,在broker中可进行配置),直到达到通知要求的时间窗口上限。
5、接收通知方可通过消息校对接口来校对消息的一致性。
方案2:
本方案也是利用MQ的ack机制,与方案1不同的是应用程序向接收通知方发送通知
交互流程如下:
1、发起通知方将通知发给MQ。 使用可靠消息一致方案中的事务消息保证本地事务与消息的原子性,最终将通知先发给MQ。
2、通知程序监听 MQ,接收MQ的消息。 方案1中接收通知方直接监听MQ,方案2中由通知程序监听MQ。通知程序若没有回应ack则MQ会重复通知。
3、通知程序通过互联网接口协议(如http、webservice)调用接收通知方案接口,完成通知。 通知程序调用接收通知方案接口成功就表示通知成功,即消费MQ消息成功,MQ将不再向通知程序投递通知消 息。
4、接收通知方可通过消息校对接口来校对消息的一致性。
方案1和方案2的不同点:
1、方案1中接收通知方与MQ接口,即接收通知方案监听 MQ,此方案主要应用与内部应用之间的通知。
2、方案2中由通知程序与MQ接口,通知程序监听MQ,收到MQ的消息后由通知程序通过互联网接口协议调用接收 通知方。此方案主要应用于外部应用之间的通知,例如支付宝、微信的支付结果通知。
RocketMQ实现最大努力通知型事务
添加依赖
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.2</version>
</dependency>配置rocketMQ
rocketmq.producer.group = producer_bank2
rocketmq.name‐server = 127.0.0.1:9876业务实现
package cn.itcast.dtx.notifymsg.pay.service.impl;
import cn.itcast.dtx.notifymsg.pay.dao.AccountPayDao;
import cn.itcast.dtx.notifymsg.pay.entity.AccountPay;
import cn.itcast.dtx.notifymsg.pay.service.AccountPayService;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
/**
* @author Administrator
* @version 1.0
**/
@Service
@Slf4j
public class AccountPayServiceImpl implements AccountPayService {
@Autowired
AccountPayDao accountPayDao;
@Autowired
RocketMQTemplate rocketMQTemplate;
//插入充值记录
@Override
public AccountPay insertAccountPay(AccountPay accountPay) {
int success = accountPayDao.insertAccountPay(accountPay.getId(), accountPay.getAccountNo(), accountPay.getPayAmount(), "success");
if(success>0){
//发送通知,使用普通消息发送通知
accountPay.setResult("success");
rocketMQTemplate.convertAndSend("topic_notifymsg",accountPay);
return accountPay;
}
return null;
}
//查询充值记录,接收通知方调用此方法来查询充值结果
@Override
public AccountPay getAccountPay(String txNo) {
AccountPay accountPay = accountPayDao.findByIdTxNo(txNo);
return accountPay;
}
}服务2监听mq
package cn.itcast.dtx.notifydemo.bank1.message;
import cn.itcast.dtx.notifydemo.bank1.entity.AccountPay;
import cn.itcast.dtx.notifydemo.bank1.model.AccountChangeEvent;
import cn.itcast.dtx.notifydemo.bank1.service.AccountInfoService;
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* @author Administrator
* @version 1.0
**/
@Component
@Slf4j
@RocketMQMessageListener(topic = "topic_notifymsg",consumerGroup = "consumer_group_notifymsg_bank1")
public class NotifyMsgListener implements RocketMQListener<AccountPay> {
@Autowired
AccountInfoService accountInfoService;
//接收消息
@Override
public void onMessage(AccountPay accountPay) {
log.info("接收到消息:{}", JSON.toJSONString(accountPay));
if("success".equals(accountPay.getResult())){
//更新账户金额
AccountChangeEvent accountChangeEvent = new AccountChangeEvent();
accountChangeEvent.setAccountNo(accountPay.getAccountNo());
accountChangeEvent.setAmount(accountPay.getPayAmount());
accountChangeEvent.setTxNo(accountPay.getId());
accountInfoService.updateAccountBalance(accountChangeEvent);
}
log.info("处理消息完成:{}", JSON.toJSONString(accountPay));
}
}业务逻辑
package cn.itcast.dtx.notifydemo.bank1.service.impl;
import cn.itcast.dtx.notifydemo.bank1.dao.AccountInfoDao;
import cn.itcast.dtx.notifydemo.bank1.entity.AccountPay;
import cn.itcast.dtx.notifydemo.bank1.model.AccountChangeEvent;
import cn.itcast.dtx.notifydemo.bank1.service.AccountInfoService;
import cn.itcast.dtx.notifydemo.bank1.spring.PayClient;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
/**
* @author Administrator
* @version 1.0
**/
@Service
@Slf4j
public class AccountInfoServiceImpl implements AccountInfoService {
@Autowired
AccountInfoDao accountInfoDao;
@Autowired
PayClient payClient;
//更新账户金额
@Override
@Transactional
public void updateAccountBalance(AccountChangeEvent accountChange) {
//幂等校验
if(accountInfoDao.isExistTx(accountChange.getTxNo())>0){
return ;
}
int i = accountInfoDao.updateAccountBalance(accountChange.getAccountNo(), accountChange.getAmount());
//插入事务记录,用于幂等控制
accountInfoDao.addTx(accountChange.getTxNo());
}
//远程调用查询充值结果
@Override
public AccountPay queryPayResult(String tx_no) {
//远程调用
AccountPay payresult = payClient.payresult(tx_no);
if("success".equals(payresult.getResult())){
//更新账户金额
AccountChangeEvent accountChangeEvent = new AccountChangeEvent();
accountChangeEvent.setAccountNo(payresult.getAccountNo());//账号
accountChangeEvent.setAmount(payresult.getPayAmount());//金额
accountChangeEvent.setTxNo(payresult.getId());//充值事务号
updateAccountBalance(accountChangeEvent);
}
return payresult;
}
}最大努力通知方案是分布式事务中对一致性要求最低的一种,适用于一些最终一致性时间敏感度低的业务; 最大努力通知方案需要实现如下功能: 1、消息重复通知机制。 2、消息校对机制。
2.6 分布式事务对比分析
2PC 最大的诟病是一个阻塞协议。RM在执行分支事务后需要等待TM的决定,此时服务会阻塞并锁定资源。由于其 阻塞机制和最差时间复杂度高, 因此,这种设计不能适应随着事务涉及的服务数量增加而扩展的需要,很难用于并 发较高以及子事务生命周期较长 (long-running transactions) 的分布式服务中。
如果拿TCC事务的处理流程与2PC两阶段提交做比较,2PC通常都是在跨库的DB层面,而TCC则在应用层面的处 理,需要通过业务逻辑来实现。这种分布式事务的实现方式的优势在于,可以让应用自己定义数据操作的粒度,使 得降低锁冲突、提高吞吐量成为可能。而不足之处则在于对应用的侵入性非常强,业务逻辑的每个分支都需要实现 try、confirm、cancel三个操作。此外,其实现难度也比较大,需要按照网络状态、系统故障等不同的失败原因实 现不同的回滚策略。典型的使用场景:满,登录送优惠券等。
可靠消息最终一致性事务适合执行周期长且实时性要求不高的场景。引入消息机制后,同步的事务操作变为基于消 息执行的异步操作, 避免了分布式事务中的同步阻塞操作的影响,并实现了两个服务的解耦。典型的使用场景:注 册送积分,登录送优惠券等。
最大努力通知是分布式事务中要求最低的一种,适用于一些最终一致性时间敏感度低的业务;允许发起通知方处理业 务失败,在接收通知方收到通知后积极进行失败处理,无论发起通知方如何处理结果都会不影响到接收通知方的后 续处理;发起通知方需提供查询执行情况接口,用于接收通知方校对结果。典型的使用场景:银行通知、支付结果 通知等。
| 2PC | TCC | 可靠消息 | 最大努力通知 | |
|---|---|---|---|---|
| 一致性 | 强一致性 | 最终一致 | 最终一致 | 最终一致 |
| 吞吐量 | 低 | 中 | 高 | 高 |
| 实现复杂度 | 易 | 难 | 中 | 易 |
总结
在条件允许的情况下,我们尽可能选择本地事务单数据源,因为它减少了网络交互带来的性能损耗,且避免了数据 弱一致性带来的种种问题。若某系统频繁且不合理的使用分布式事务,应首先从整体设计角度观察服务的拆分是否 合理,是否高内聚低耦合?是否粒度太小?分布式事务一直是业界难题,因为网络的不确定性,而且我们习惯于拿 分布式事务与单机事务ACID做对比。
无论是数据库层的XA、还是应用层TCC、可靠消息、最大努力通知等方案,都没有完美解决分布式事务问题,它们 不过是各自在性能、一致性、可用性等方面做取舍,寻求某些场景偏好下的权衡。
三、Atomikos
xa--jta---具体实现的一种Atomikos.
核心步骤
Atomikos步骤分析:
1.公共数据源代理的配置
1)配置两个数据源
2)xa数据源统一管理
3)分别配置两个sessionFactoryBean
4)管理各自的mapper
2.配置全局事务管理器
3.将全局事务管理器集成到spring之中
4.切面管理配置和事务通知管理
5.将切入点和事务通知关联起来
步骤
添加依赖
<dependency>
<groupId>com.atomikos</groupId>
<artifactId>atomikos-util</artifactId>
<version>${atomikos.version}</version>
</dependency>
<dependency>
<groupId>com.atomikos</groupId>
<artifactId>transactions</artifactId>
<version>${atomikos.version}</version>
</dependency>
<dependency>
<groupId>com.atomikos</groupId>
<artifactId>transactions-jta</artifactId>
<version>${atomikos.version}</version>
</dependency>
<dependency>
<groupId>com.atomikos</groupId>
<artifactId>transactions-jdbc</artifactId>
<version>${atomikos.version}</version>
</dependency>
<dependency>
<groupId>com.atomikos</groupId>
<artifactId>transactions-api</artifactId>
<version>${atomikos.version}</version>
</dependency>配置多个数据源
jdbc.driver=com.mysql.jdbc.Driver
jdbc.url=jdbc:mysql://127.0.0.1:3306/order?useUnicode=true&characterEncoding=utf8&autoReconnect=true
jdbc.username=root
#jdbc.pwd=123456
jdbc.pwd=
jdbc.log.driver=com.mysql.jdbc.Driver
jdbc.log.url=jdbc:mysql://127.0.0.1:3306/log?useUnicode=true&characterEncoding=utf8&autoReconnect=true
jdbc.log.username=root
#jdbc.log.pwd=123456
jdbc.log.pwd=配置多个数据源
参考数据源配置配置公共事务管理器
<!-- 配置atomikos事务管理器 -->
<bean id="atomikosTransactionManager" class="com.atomikos.icatch.jta.UserTransactionManager" init-method="init" destroy-method="close">
<property name="forceShutdown" value="false"/>
</bean>
<!--配置本地事务管理器-->
<bean id="atomikosUserTransaction" class="com.atomikos.icatch.jta.UserTransactionImp">
<property name="transactionTimeout" value="300000"/>
</bean>
<!--JTA事务管理器-->
<bean id="springTransactionManager" class="org.springframework.transaction.jta.JtaTransactionM anager">
<property name="transactionManager">
<ref bean="atomikosTransactionManager"/>
</property>
<property name="userTransaction">
<ref bean="atomikosUserTransaction"/>
</property>
<property name="allowCustomIsolationLevels" value="true"/>
</bean>配置事务通知
<!--@Aspect-->
<aop:aspectj-autoproxy/>
<!--使用CGLIB动态代理-->
<tx:annotation-driven transaction- manager="springTransactionManager" proxy-target- class="true" />
<!--配置事务的通知-->
<!-- the transactional advice (what 'happens'; see the <aop:advisor/> bean below) 事务传播特性配置 -->
<tx:advice id="txAdvice" transaction- manager="springTransactionManager">
<!-- the transactional semantics... -->
<tx:attributes>
<tx:method name="add*" propagation="REQUIRED" isolation="DEFAULT" rollback-for="java.lang.Exception" />
<tx:method name="save*" propagation="REQUIRED" isolation="DEFAULT" rollback-for="java.lang.Exception" />
<tx:method name="insert*" propagation="REQUIRED" isolation="DEFAULT" rollback-for="java.lang.Exception" />
<tx:method name="update*" propagation="REQUIRED" isolation="DEFAULT" rollback-for="java.lang.Exception" />
<tx:method name="modify*" propagation="REQUIRED" isolation="DEFAULT" rollback-for="java.lang.Exception" />
<tx:method name="delete*" propagation="REQUIRED" isolation="DEFAULT" rollback-for="java.lang.Exception" />
<!-- 查询方法 -->
<tx:method name="query*" read-only="true" />
<tx:method name="select*" read-only="true" />
<tx:method name="find*" read-only="true" />
</tx:attributes>
</tx:advice>织入业务
<!-- 声明式事务AOP配置 -->
<aop:config>
<!--配合aop的切入点-->
<aop:pointcut expression="execution(* com.itheimabk01.service.impl.*.*(..))" id="tranpointcut" />
<!--生命式事务通知,配置事务通知和切入点关系-->
<aop:advisor advice-ref="txAdvice" pointcut- ref="tranpointcut" />
</aop:config>LCN
核心步骤
1.建立eureka注册中心、并启动(Tx-manager是一个微服务,需要注册到eureka中)
2.启动redis(存储事务的状态记录)
3.TX-manager项目修改redis地址和端口
项目发起方:
pom文件引入lcn的jar包
启动类中添加数据源代理
添加mapper的扫描包地址
添加与tx-manager的通讯模块(tx-client)Tc模块
在配置文件中添加tx-manager的地址(通讯用)
发起方业务方法上添加@TxTransaction(isStart=true)注解
事务参与方:
pom文件引入lcn的jar包,以及druid的jar包(数据库代理服务)
启动类中添加数据源代理,接管数据库的会话
配置mapper的扫描包地址
添加与tx-manager的通讯模块(TC)
在配置文件中添加tx-manager的地址(通讯用) http://127.0.0.1:8899/tx/manager/
业务全局事务的参与方的业务方法上添加@TxTransaction注解
四、分布式框架选择
TCC (Try-Confirm-Cancle)
适用场景:严格一致性 执行时间短 实时性要求高
举例: 红包, 收付款, 实时汇款业务.
异步确保型
适用场景:执行周期较长、实时性要求不高
例如: 非实时汇款业务、退货/退款业务
最大努力通知型
优点:高并发, 低耦合
缺点:不支持回滚;
适用场景: 交易结果消息的通知等